{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Introduction\n",
    "\n",
    "\n",
    "In this notebook, we will load the data that we generated using [RandomDataGenerator](https://github.com/abulbasar/RandomDataGenerator) script before. Create the schema in Cassandra using [this script](https://github.com/abulbasar/pyspark-examples/blob/master/kafka-clients/cassandra_schema.cql). After loading the data into Cassandra tables, we will also write a few queries to generate the behavioural characteristics. In this notebook, we will find the avg spent and standard deviation by each customer and based those we define a threshold. We will use threshold to see what transactions are anomalous. We can apply the anomaly detetection in the batch process and streaming application as well."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a spark session."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://10.0.2.15:4040\n"
     ]
    }
   ],
   "source": [
    "import sys, glob, os\n",
    "SPARK_HOME=os.environ['SPARK_HOME']\n",
    "sys.path.append(SPARK_HOME + \"/python\")\n",
    "sys.path.append(glob.glob(SPARK_HOME + \"/python/lib/py4j*.zip\")[0])\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.conf import SparkConf \n",
    "\n",
    "\n",
    "cassandra_host = \"localhost\"\n",
    "\n",
    "spark_conf = (SparkConf()\n",
    "                .setAppName(\"BatchJob - Data loader\")\n",
    "                .setIfMissing(\"spark.master\", \"local\")\n",
    "                .set(\"spark.cassandra.connection.host\", cassandra_host)\n",
    "                .set(\"spark.cassandra.connection.port\", 9042)\n",
    "                .set(\"spark.sql.shuffle.partitions\", 10)\n",
    "                .set(\"\", 10)\n",
    "               )\n",
    "\n",
    "# Create spark session\n",
    "spark = (SparkSession\n",
    "         .builder\n",
    "         .config(conf = spark_conf)\n",
    "         .getOrCreate()\n",
    "        )\n",
    "sc = spark.sparkContext\n",
    "sql = spark.sql\n",
    "print(sc.uiWebUrl)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Import libraries for Spark SQL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In the base path output of the data generator are saved."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "base_path = \"file:///home/cloudera/notebooks/RandomDataGenerator-master/target/\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Load customers.json into customers table in Cassandra. Remember to include cassandra driver for spark. Otherwise, spark will not be able to interact with Cassandra"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+---+----------+--------------------+----------+------+------------+--------------+\n",
      "|             address|age|       dob|               email|first_name|gender|          id|     last_name|\n",
      "+--------------------+---+----------+--------------------+----------+------+------------+--------------+\n",
      "|[Brooklyn,Kings,L...| 19|1999-09-09| iforell@hotmail.com|    Ivania|     M|800000000000|        Forell|\n",
      "|[Bronx,Bronx,LF00...| 63|1955-06-23|jodena.tetreau@ms...|    Jodena|     F|800000000001|       Tetreau|\n",
      "|[Merrick,Nassau,L...| 73|1945-02-05|     sshoyko@msn.com|   Shawnic|     F|800000000002|        Shoyko|\n",
      "|[New York,New Yor...| 67|1950-11-19|jerzei.berardo@gm...|    Jerzei|     F|800000000003|       Berardo|\n",
      "|[Saratoga Springs...| 28|1990-06-05|einzinger@hotmail...|   Elinora|     M|800000000004|      Inzinger|\n",
      "|[Rome,Oneida,LF00...| 34|1984-09-28|bschillinglaw@msn...|   Braylyn|     F|800000000005|  Schillinglaw|\n",
      "|[New York,New Yor...| 61|1957-01-24| hsherwood@gmail.com|     Hagan|     M|800000000006|      Sherwood|\n",
      "|[Armonk,Westchest...| 71|1947-10-16|kejon.giner@hotma...|     Kejon|     M|800000000007|         Giner|\n",
      "|[New York,New Yor...| 31|1987-07-23|shyanne.leut@gmai...|   Shyanne|     M|800000000008|          Leut|\n",
      "|[Rensselaer,Renss...| 45|1972-11-30|jeaninne.gollakis...|  Jeaninne|     F|800000000009|Gollakistagari|\n",
      "|[Suffern,Rockland...| 62|1956-01-21|akupferschmid@gma...|    Audrii|     M|800000000010|  Kupferschmid|\n",
      "|[Hicksville,Nassa...| 39|1979-03-27|   ischbel@gmail.com|     Illya|     F|800000000011|       Schöbel|\n",
      "|[Westbury,Nassau,...| 45|1973-07-14| mboehne@hotmail.com|    Matten|     M|800000000012|        Boehne|\n",
      "|[Sea Cliff,Nassau...| 64|1954-03-09|ariadny.dutoit@ms...|   Ariadny|     M|800000000013|       Du toit|\n",
      "|[Webster,Monroe,L...| 32|1986-02-28|echhatralia@hotma...|Elizabethe|     F|800000000014|    Chhatralia|\n",
      "|[Ft. Drum,Jeffers...| 70|1947-12-30|mmichael.dchting@...|  Mmichael|     M|800000000015|      Düchting|\n",
      "|[Jamaica,Queens,L...| 35|1983-07-04|juley.leugemors@m...|     Juley|     M|800000000016|     Leugemors|\n",
      "|[New York,New Yor...| 18|2000-07-18|lavontae.kruschel...|  Lavontae|     M|800000000017|      Kruschel|\n",
      "|[Forest Hills,Que...| 62|1956-07-23|    jjinchul@msn.com|       Jio|     M|800000000018|      Jin chul|\n",
      "|[Bronx,Bronx,LF00...| 74|1943-11-27|    cdelalex@msn.com|       Car|     F|800000000019|       Delalex|\n",
      "+--------------------+---+----------+--------------------+----------+------+------------+--------------+\n",
      "only showing top 20 rows\n",
      "\n",
      "root\n",
      " |-- address: struct (nullable = true)\n",
      " |    |-- city: string (nullable = true)\n",
      " |    |-- county: string (nullable = true)\n",
      " |    |-- id: string (nullable = true)\n",
      " |    |-- postal_code: double (nullable = true)\n",
      " |    |-- state: string (nullable = true)\n",
      " |    |-- street: string (nullable = true)\n",
      " |-- age: long (nullable = true)\n",
      " |-- dob: string (nullable = true)\n",
      " |-- email: string (nullable = true)\n",
      " |-- first_name: string (nullable = true)\n",
      " |-- gender: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- last_name: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "customers = spark.read.options(inferSchena = True).json(base_path + \"customers.json\")\n",
    "customers.show()\n",
    "customers.printSchema()\n",
    "(customers\n",
    "    .drop(\"address\")\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .format(\"org.apache.spark.sql.cassandra\")\n",
    "    .options(table = \"customer\", keyspace = \"cc\")\n",
    "    .save())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a utility function to return a dataframe based on a cassandra table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "def cass_table(table_name):\n",
    "    return (spark\n",
    "        .read\n",
    "        .format(\"org.apache.spark.sql.cassandra\")\n",
    "        .options(table = table_name, keyspace = \"cc\")\n",
    "        .load())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+\n",
      "|          id|address|age|amount_lower_threshold|amount_upper_threshold|       dob|               email|first_name|gender|    last_name|\n",
      "+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+\n",
      "|800000002686|   null| 79|                  null|                  null|1939-03-15|clea.eckenbrecht@...|      Clea|     M|  Eckenbrecht|\n",
      "|800000009014|   null| 22|                  null|                  null|1996-03-29|deondrick.manopin...| Deondrick|     F|  Manopinives|\n",
      "|800000003508|   null| 33|                  null|                  null|1985-04-26|jchristophides@ms...|  Jazabell|     F|Christophides|\n",
      "|800000009097|   null| 74|                  null|                  null|1944-05-26|kiptyn.chato@hotm...|    Kiptyn|     F|        Chato|\n",
      "|800000003132|   null| 76|                  null|                  null|1942-06-25|dijohn.parette@ms...|    Dijohn|     M|      Parette|\n",
      "|800000003917|   null| 76|                  null|                  null|1942-08-02|beunca.zwiebel@gm...|    Beunca|     M|      Zwiebel|\n",
      "|800000008993|   null| 55|                  null|                  null|1963-03-29|tyanah.hudnall@ho...|    Tyanah|     F|      Hudnall|\n",
      "|800000004685|   null| 89|                  null|                  null|1929-07-10| jgoddyn@hotmail.com|  Jaimarie|     F|       Goddyn|\n",
      "|800000008134|   null| 20|                  null|                  null|1998-09-25| sdrutel@hotmail.com|     Solie|     F|       Drutel|\n",
      "|800000003348|   null| 73|                  null|                  null|1945-05-11|   kbeddle@gmail.com|     Kavan|     M|       Beddle|\n",
      "|800000002869|   null| 67|                  null|                  null|1950-12-10|ashaun.kolymbos@m...|    Ashaun|     F|     Kolymbos|\n",
      "|800000008300|   null| 19|                  null|                  null|1999-07-31|thedstrm@hotmail.com|      Tova|     F|     Hedström|\n",
      "|800000003180|   null| 56|                  null|                  null|1962-10-05| tboxall@hotmail.com|    Tanesa|     M|       Boxall|\n",
      "|800000008074|   null| 73|                  null|                  null|1945-04-28|   mcuwalski@msn.com|    Mahlon|     F|     Cuwalski|\n",
      "|800000008345|   null| 30|                  null|                  null|1987-12-11|aylani.ciamarra@h...|    Aylani|     F|     Ciamarra|\n",
      "|800000000489|   null| 25|                  null|                  null|1993-09-26|  kalamgir@gmail.com|    Kelvan|     M|      Alamgir|\n",
      "|800000003565|   null| 80|                  null|                  null|1938-08-26|gerome.hoepfner@h...|    Gerome|     F|     Hoepfner|\n",
      "|800000002892|   null| 72|                  null|                  null|1946-01-10|yenna.wagley@hotm...|     Yenna|     F|       Wagley|\n",
      "|800000005866|   null| 32|                  null|                  null|1985-12-15| lhuckabay@gmail.com|    Lethea|     F|     Huckabay|\n",
      "|800000009275|   null| 54|                  null|                  null|1964-02-17|dacarri.dodan@hot...|   Dacarri|     M|        Dodan|\n",
      "+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cass_table(\"customer\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Load merchants.json to merchants table in cassandra."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+--------------------+\n",
      "|          id|                name|\n",
      "+------------+--------------------+\n",
      "|DZ0000000000|iShares 7-10 Year...|\n",
      "|DZ0000000001|National American...|\n",
      "|DZ0000000002|Jensyn Acquistion...|\n",
      "|DZ0000000003|     Interface, Inc.|\n",
      "|DZ0000000004| FTD Companies, Inc.|\n",
      "|DZ0000000005|NextDecade Corpor...|\n",
      "|DZ0000000006|  MakeMyTrip Limited|\n",
      "|DZ0000000007|Dynavax Technolog...|\n",
      "|DZ0000000008|        HyreCar Inc.|\n",
      "|DZ0000000009|Highland/iBoxx Se...|\n",
      "|DZ0000000010|  Liberty Global plc|\n",
      "|DZ0000000011|Green Plains Part...|\n",
      "|DZ0000000012|  Vertex Energy, Inc|\n",
      "|DZ0000000013|     Fuel Tech, Inc.|\n",
      "|DZ0000000014|    TiVo Corporation|\n",
      "|DZ0000000015|        Cerecor Inc.|\n",
      "|DZ0000000016|    SMTC Corporation|\n",
      "|DZ0000000017|Magellan Health, ...|\n",
      "|DZ0000000018|      Check-Cap Ltd.|\n",
      "|DZ0000000019|Village Bank and ...|\n",
      "+------------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n",
      "root\n",
      " |-- id: string (nullable = true)\n",
      " |-- name: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "merchants = spark.read.options(inferSchena = True).json(base_path + \"merchants.json\")\n",
    "merchants.show()\n",
    "merchants.printSchema()\n",
    "(merchants\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .format(\"org.apache.spark.sql.cassandra\")\n",
    "    .options(table = \"merchant\", keyspace = \"cc\")\n",
    "    .save())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-------+--------------------+------+\n",
      "|          id|address|                name|ticker|\n",
      "+------------+-------+--------------------+------+\n",
      "|DZ0000000059|   null|Synergy Pharmaceu...|  null|\n",
      "|DZ0000000046|   null|Northern Trust Co...|  null|\n",
      "|DZ0000000020|   null|Gladstone Investm...|  null|\n",
      "|DZ0000000093|   null|Golden Ocean Grou...|  null|\n",
      "|DZ0000000008|   null|        HyreCar Inc.|  null|\n",
      "|DZ0000000090|   null|Allegiance Bancsh...|  null|\n",
      "|DZ0000000027|   null| iShares Asia 50 ETF|  null|\n",
      "|DZ0000000019|   null|Village Bank and ...|  null|\n",
      "|DZ0000000043|   null|       Popular, Inc.|  null|\n",
      "|DZ0000000074|   null|1347 Property Ins...|  null|\n",
      "|DZ0000000065|   null|Invesco Dividend ...|  null|\n",
      "|DZ0000000025|   null|RXi Pharmaceutica...|  null|\n",
      "|DZ0000000037|   null|Sierra Wireless, ...|  null|\n",
      "|DZ0000000041|   null|  Cool Holdings Inc.|  null|\n",
      "|DZ0000000054|   null|Willis Lease Fina...|  null|\n",
      "|DZ0000000058|   null|Priority Technolo...|  null|\n",
      "|DZ0000000038|   null|Mitcham Industrie...|  null|\n",
      "|DZ0000000089|   null|  QCR Holdings, Inc.|  null|\n",
      "|DZ0000000062|   null|        Xilinx, Inc.|  null|\n",
      "|DZ0000000009|   null|Highland/iBoxx Se...|  null|\n",
      "+------------+-------+--------------------+------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cass_table(\"merchant\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Upload transactions.json to transactions table. Here we are doing some basic transformation for the time timestamp column."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+--------+------------+------------+------------+-------------------+\n",
      "|            amount|category| customer_id|          id| merchant_id|          timestamp|\n",
      "+------------------+--------+------------+------------+------------+-------------------+\n",
      "| 802.7103986581193|     atm|800000002081|690000000000|DZ0000000085|2018-08-31 16:45:00|\n",
      "|507.74628999676526|     web|800000005031|690000000001|DZ0000000058|2018-07-16 01:53:31|\n",
      "| 606.4194934089878|     pos|800000003872|690000000002|DZ0000000063|2018-05-17 14:57:21|\n",
      "| 319.4347823556573|     web|800000004311|690000000003|DZ0000000065|2018-08-07 19:28:49|\n",
      "| 6.499356159099801|  mobile|800000001603|690000000004|DZ0000000061|2018-07-05 13:25:49|\n",
      "|1202.5906039488204|     web|800000008644|690000000005|DZ0000000045|2018-09-24 08:59:37|\n",
      "| 334.1742337330727|  mobile|800000009101|690000000006|DZ0000000023|2018-05-27 18:44:58|\n",
      "|1276.2935468125615|     atm|800000003658|690000000007|DZ0000000003|2018-08-18 22:33:10|\n",
      "|1014.0822024764368|     pos|800000003095|690000000008|DZ0000000064|2018-05-24 11:06:38|\n",
      "| 1361.068480696082|  mobile|800000008742|690000000009|DZ0000000024|2018-05-30 22:41:40|\n",
      "|1148.2079577326738|  mobile|800000008958|690000000010|DZ0000000096|2018-05-05 17:19:35|\n",
      "| 1289.624614829537|     atm|800000001963|690000000011|DZ0000000076|2018-08-02 00:33:29|\n",
      "| 222.7393839391672|  mobile|800000003442|690000000012|DZ0000000038|2018-07-26 23:05:54|\n",
      "|157.11972843897124|     pos|800000008979|690000000013|DZ0000000050|2018-08-21 04:45:36|\n",
      "| 997.6210235932542|     web|800000002526|690000000014|DZ0000000085|2018-05-20 23:18:56|\n",
      "|254.38889279371102|     web|800000003454|690000000015|DZ0000000054|2018-08-05 18:42:20|\n",
      "| 595.7214665802746|     web|800000004784|690000000016|DZ0000000076|2018-09-24 15:07:33|\n",
      "|1187.4798800157707|  mobile|800000005671|690000000017|DZ0000000061|2018-09-19 14:50:35|\n",
      "| 576.0836502821041|     web|800000005306|690000000018|DZ0000000098|2018-08-02 06:13:15|\n",
      "|1408.0770008718202|     atm|800000005365|690000000019|DZ0000000019|2018-06-02 03:20:39|\n",
      "+------------------+--------+------------+------------+------------+-------------------+\n",
      "only showing top 20 rows\n",
      "\n",
      "root\n",
      " |-- amount: double (nullable = true)\n",
      " |-- category: string (nullable = true)\n",
      " |-- customer_id: string (nullable = true)\n",
      " |-- id: string (nullable = true)\n",
      " |-- merchant_id: string (nullable = true)\n",
      " |-- timestamp: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "transactions = (spark\n",
    "                .read\n",
    "                .options(inferSchena = True)\n",
    "                .json(base_path + \"transactions.json\")\n",
    "                .withColumn(\"timestamp\"\n",
    "                    , F.expr(\"from_unixtime(cast(timestamp/pow(10, 9) as bigint))\"))\n",
    "               )\n",
    "transactions.show()\n",
    "transactions.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "(transactions\n",
    "    .write\n",
    "    .mode(\"overwrite\")\n",
    "    .format(\"org.apache.spark.sql.cassandra\")\n",
    "    .options(table = \"transactions\", keyspace = \"cc\")\n",
    "    .save())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>customer_id</th>\n",
       "      <th>timestamp</th>\n",
       "      <th>id</th>\n",
       "      <th>amount</th>\n",
       "      <th>category</th>\n",
       "      <th>flag_ml</th>\n",
       "      <th>flag_threshold</th>\n",
       "      <th>location</th>\n",
       "      <th>location_id</th>\n",
       "      <th>merchant_id</th>\n",
       "      <th>overruled</th>\n",
       "      <th>overruled_comment</th>\n",
       "      <th>overruled_date</th>\n",
       "      <th>score</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>800000002686</td>\n",
       "      <td>2018-10-01 15:31:58</td>\n",
       "      <td>690000204866</td>\n",
       "      <td>2131.528888</td>\n",
       "      <td>web</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>DZ0000000022</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>800000002686</td>\n",
       "      <td>2018-09-28 09:00:29</td>\n",
       "      <td>690000628963</td>\n",
       "      <td>5158.127424</td>\n",
       "      <td>mobile</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>DZ0000000010</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>800000002686</td>\n",
       "      <td>2018-09-25 11:15:10</td>\n",
       "      <td>690000463846</td>\n",
       "      <td>2899.654655</td>\n",
       "      <td>mobile</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>DZ0000000003</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>800000002686</td>\n",
       "      <td>2018-09-25 00:54:22</td>\n",
       "      <td>690000257935</td>\n",
       "      <td>1753.912085</td>\n",
       "      <td>web</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>DZ0000000049</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>800000002686</td>\n",
       "      <td>2018-09-23 00:36:28</td>\n",
       "      <td>690000152367</td>\n",
       "      <td>1528.890501</td>\n",
       "      <td>pos</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>DZ0000000064</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "      <td>None</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "    customer_id           timestamp            id       amount category  \\\n",
       "0  800000002686 2018-10-01 15:31:58  690000204866  2131.528888      web   \n",
       "1  800000002686 2018-09-28 09:00:29  690000628963  5158.127424   mobile   \n",
       "2  800000002686 2018-09-25 11:15:10  690000463846  2899.654655   mobile   \n",
       "3  800000002686 2018-09-25 00:54:22  690000257935  1753.912085      web   \n",
       "4  800000002686 2018-09-23 00:36:28  690000152367  1528.890501      pos   \n",
       "\n",
       "  flag_ml flag_threshold location location_id   merchant_id overruled  \\\n",
       "0    None           None     None        None  DZ0000000022      None   \n",
       "1    None           None     None        None  DZ0000000010      None   \n",
       "2    None           None     None        None  DZ0000000003      None   \n",
       "3    None           None     None        None  DZ0000000049      None   \n",
       "4    None           None     None        None  DZ0000000064      None   \n",
       "\n",
       "  overruled_comment overruled_date score  \n",
       "0              None           None  None  \n",
       "1              None           None  None  \n",
       "2              None           None  None  \n",
       "3              None           None  None  \n",
       "4              None           None  None  "
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cass_table(\"transactions\").limit(5).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Find Standard deviation and avg amount of transactions for each customers and define the amount thresholds."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+------------------+------------------+----------------------+----------------------+\n",
      "| customer_id|        amount_avg|        amount_std|amount_upper_threshold|amount_lower_threshold|\n",
      "+------------+------------------+------------------+----------------------+----------------------+\n",
      "|800000003508|4000.8447911541843|2030.5769053432075|     6031.421696497392|    1970.2678858109768|\n",
      "|800000002892|4015.4699165463403|1879.4048226518348|     5894.874739198175|    2136.0650938945055|\n",
      "|800000006164|  4064.89169687135|1818.1123446674483|     5883.004041538798|     2246.779352203902|\n",
      "|800000004876|4330.1780265902435|1915.2435772743434|     6245.421603864586|       2414.9344493159|\n",
      "|800000003869|3424.0908568407763| 1755.596040364472|     5179.686897205248|    1668.4948164763043|\n",
      "|800000004211|4046.3803344300945|1711.4403921994372|     5757.820726629532|    2334.9399422306574|\n",
      "|800000004464| 4270.542190952598|2010.1792401752057|     6280.721431127803|     2260.362950777392|\n",
      "|800000004098|4003.3536470509907|2000.1040938837384|     6003.457740934729|    2003.2495531672523|\n",
      "|800000009189|3550.6984423498975|1869.7504492057158|     5420.448891555613|    1680.9479931441817|\n",
      "|800000007511| 4120.659520250922|1988.7088349054143|     6109.368355156335|    2131.9506853455073|\n",
      "|800000007791| 4083.125821957747|2020.9041914242346|     6104.030013381982|    2062.2216305335123|\n",
      "|800000006972|4089.5271926096743|1767.8125731165621|     5857.339765726237|     2321.714619493112|\n",
      "|800000005888| 4150.598529477467|1948.1137714556191|     6098.712300933086|    2202.4847580218484|\n",
      "|800000009559|3947.6140114201517|2113.7733938219103|    6061.3874052420615|    1833.8406175982414|\n",
      "|800000004305| 3803.861869891568|2012.1649097559832|     5816.026779647551|    1791.6969601355847|\n",
      "|800000007503| 3959.693948521354|2020.6257739933476|     5980.319722514701|    1939.0681745280062|\n",
      "|800000005442| 4070.063498435563|1845.8721265067757|     5915.935624942338|    2224.1913719287872|\n",
      "|800000002664|4043.8792498959906|2073.0365821324635|     6116.915832028454|     1970.842667763527|\n",
      "|800000006866| 4128.311534458804|1965.8643458573945|     6094.175880316198|      2162.44718860141|\n",
      "|800000000568|3883.7409993924985|2007.1696753733509|      5890.91067476585|    1876.5713240191476|\n",
      "+------------+------------------+------------------+----------------------+----------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "agg = (transactions\n",
    "    .groupBy(\"customer_id\")\n",
    "    .agg(F.avg(\"amount\").alias(\"amount_avg\"), F.stddev(\"amount\").alias(\"amount_std\"))\n",
    "    .withColumn(\"amount_upper_threshold\", F.expr(\"amount_avg + amount_std\"))\n",
    "    .withColumn(\"amount_lower_threshold\", F.expr(\"amount_avg - amount_std\"))\n",
    ")\n",
    "\n",
    "agg.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Update customer profile with threshold values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "(agg\n",
    "    .select(\"customer_id\", \"amount_upper_threshold\", \"amount_lower_threshold\")\n",
    "    .withColumnRenamed(\"customer_id\", \"id\")\n",
    "    .write\n",
    "    .mode(\"append\")\n",
    "    .format(\"org.apache.spark.sql.cassandra\")\n",
    "    .options(table = \"customer\", keyspace = \"cc\")\n",
    "    .save())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+\n",
      "|          id|address|age|amount_lower_threshold|amount_upper_threshold|       dob|               email|first_name|gender|    last_name|\n",
      "+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+\n",
      "|800000002686|   null| 79|    1873.6919950506053|     5750.826738407709|1939-03-15|clea.eckenbrecht@...|      Clea|     M|  Eckenbrecht|\n",
      "|800000009014|   null| 22|    2036.0152859916593|     5845.483106360582|1996-03-29|deondrick.manopin...| Deondrick|     F|  Manopinives|\n",
      "|800000003508|   null| 33|    1970.2678858109768|     6031.421696497392|1985-04-26|jchristophides@ms...|  Jazabell|     F|Christophides|\n",
      "|800000009097|   null| 74|    1860.8318397319022|     5882.868594280499|1944-05-26|kiptyn.chato@hotm...|    Kiptyn|     F|        Chato|\n",
      "|800000003132|   null| 76|    2023.0223670188468|     6052.967678561699|1942-06-25|dijohn.parette@ms...|    Dijohn|     M|      Parette|\n",
      "|800000003917|   null| 76|     1875.634377457154|     5795.304678912224|1942-08-02|beunca.zwiebel@gm...|    Beunca|     M|      Zwiebel|\n",
      "|800000008993|   null| 55|     1999.201093657428|     6028.398983898406|1963-03-29|tyanah.hudnall@ho...|    Tyanah|     F|      Hudnall|\n",
      "|800000004685|   null| 89|    2131.9161732982784|     6111.050393765384|1929-07-10| jgoddyn@hotmail.com|  Jaimarie|     F|       Goddyn|\n",
      "|800000008134|   null| 20|    1933.7876369805356|     5861.082308536266|1998-09-25| sdrutel@hotmail.com|     Solie|     F|       Drutel|\n",
      "|800000003348|   null| 73|    2147.3209813886224|     6229.440293829015|1945-05-11|   kbeddle@gmail.com|     Kavan|     M|       Beddle|\n",
      "|800000002869|   null| 67|    2392.9826902513546|     6017.979224551107|1950-12-10|ashaun.kolymbos@m...|    Ashaun|     F|     Kolymbos|\n",
      "|800000008300|   null| 19|     2156.215479886143|     6011.149014560237|1999-07-31|thedstrm@hotmail.com|      Tova|     F|     Hedström|\n",
      "|800000003180|   null| 56|    2067.9690306337525|     5907.207715732501|1962-10-05| tboxall@hotmail.com|    Tanesa|     M|       Boxall|\n",
      "|800000008074|   null| 73|     2103.281039638836|     5822.083484392527|1945-04-28|   mcuwalski@msn.com|    Mahlon|     F|     Cuwalski|\n",
      "|800000008345|   null| 30|    1672.6642605074392|     5399.260842662175|1987-12-11|aylani.ciamarra@h...|    Aylani|     F|     Ciamarra|\n",
      "|800000000489|   null| 25|    2069.7915323791008|     5946.407282457036|1993-09-26|  kalamgir@gmail.com|    Kelvan|     M|      Alamgir|\n",
      "|800000003565|   null| 80|    2230.4123441626293|     5976.986078401234|1938-08-26|gerome.hoepfner@h...|    Gerome|     F|     Hoepfner|\n",
      "|800000002892|   null| 72|    2136.0650938945055|     5894.874739198175|1946-01-10|yenna.wagley@hotm...|     Yenna|     F|       Wagley|\n",
      "|800000005866|   null| 32|    1980.9181616371284|     6043.410205156697|1985-12-15| lhuckabay@gmail.com|    Lethea|     F|     Huckabay|\n",
      "|800000009275|   null| 54|    1682.5190589759486|    5784.1974432796205|1964-02-17|dacarri.dodan@hot...|   Dacarri|     M|        Dodan|\n",
      "+------------+-------+---+----------------------+----------------------+----------+--------------------+----------+------+-------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cass_table(\"customer\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Work with Cassandra table directly with python. We will use this technique from streaming application."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "from cassandra.cluster import Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Row(amount_lower_threshold=2406.1891696538737)"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "cluster = Cluster([\"localhost\"])\n",
    "cass = cluster.connect(\"cc\")\n",
    "cass.execute(\"select amount_lower_threshold from customer where id = %s\", ('800000009260',)).one()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('800000009260', 2406.1891696538737),\n",
       " ('800000001652', 1999.5693000247313),\n",
       " ('800000005063', 2129.9385858059727)]"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "\n",
    "rdd = sc.parallelize(['800000009260', '800000001652', '800000005063'])\n",
    "\n",
    "def detect_anomalies(tnx):\n",
    "    cluster = Cluster([\"localhost\"])\n",
    "    cass = cluster.connect(\"cc\")\n",
    "    result = []\n",
    "    for r in tnx:\n",
    "        rec = cass.execute(\"select id, amount_lower_threshold from customer where id = %s\"\n",
    "                           , (r,)).one()\n",
    "        result.append((rec.id, rec.amount_lower_threshold))\n",
    "    cass.shutdown()\n",
    "    cluster.shutdown()\n",
    "    return result\n",
    "\n",
    "rdd.mapPartitions(detect_anomalies).collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
